agentmux_srv\backend\wshutil/
cmdreader.rs1#![allow(dead_code)]
2use std::io::{BufRead, BufReader, Read};
13use tokio::sync::mpsc;
14
15pub struct CmdReader {
17 pub msg_tx: mpsc::Sender<Vec<u8>>,
19}
20
21impl CmdReader {
22 pub fn new(msg_tx: mpsc::Sender<Vec<u8>>) -> Self {
24 Self { msg_tx }
25 }
26
27 pub fn read_single_message(input: impl Read) -> Result<Vec<u8>, String> {
29 let mut reader = BufReader::new(input);
30 let mut line = String::new();
31 reader
32 .read_line(&mut line)
33 .map_err(|e| format!("read error: {}", e))?;
34
35 let trimmed = line.trim();
36 if trimmed.is_empty() {
37 return Err("empty input".to_string());
38 }
39
40 serde_json::from_str::<serde_json::Value>(trimmed)
42 .map_err(|e| format!("invalid JSON: {}", e))?;
43
44 Ok(trimmed.as_bytes().to_vec())
45 }
46
47 pub fn start_reading(
51 &self,
52 input: impl Read + Send + 'static,
53 ) -> std::thread::JoinHandle<Result<(), String>> {
54 let tx = self.msg_tx.clone();
55 std::thread::spawn(move || {
56 let reader = BufReader::new(input);
57 for line in reader.lines() {
58 let line = line.map_err(|e| format!("read error: {}", e))?;
59 let trimmed = line.trim().to_string();
60 if trimmed.is_empty() {
61 continue;
62 }
63
64 if serde_json::from_str::<serde_json::Value>(&trimmed).is_err() {
66 tracing::warn!("skipping invalid JSON line: {}", &trimmed[..trimmed.len().min(100)]);
67 continue;
68 }
69
70 tx.blocking_send(trimmed.into_bytes())
71 .map_err(|e| format!("channel send error: {}", e))?;
72 }
73 Ok(())
74 })
75 }
76
77 pub fn read_all(input: impl Read) -> Result<Vec<u8>, String> {
80 let mut buf = String::new();
81 let mut reader = BufReader::new(input);
82 reader
83 .read_to_string(&mut buf)
84 .map_err(|e| format!("read error: {}", e))?;
85
86 let trimmed = buf.trim();
87 if trimmed.is_empty() {
88 return Err("empty input".to_string());
89 }
90
91 Ok(trimmed.as_bytes().to_vec())
92 }
93}
94
95#[cfg(test)]
96mod tests {
97 use super::*;
98 use std::io::Cursor;
99
100 #[test]
101 fn test_read_single_message() {
102 let input = Cursor::new(b"{\"command\":\"test\"}\n");
103 let msg = CmdReader::read_single_message(input).unwrap();
104 assert_eq!(String::from_utf8(msg).unwrap(), "{\"command\":\"test\"}");
105 }
106
107 #[test]
108 fn test_read_single_message_invalid_json() {
109 let input = Cursor::new(b"not json\n");
110 let result = CmdReader::read_single_message(input);
111 assert!(result.is_err());
112 assert!(result.unwrap_err().contains("invalid JSON"));
113 }
114
115 #[test]
116 fn test_read_single_message_empty() {
117 let input = Cursor::new(b"\n");
118 let result = CmdReader::read_single_message(input);
119 assert!(result.is_err());
120 }
121
122 #[test]
123 fn test_read_all() {
124 let input = Cursor::new(b"{\"data\": \"hello world\"}");
125 let msg = CmdReader::read_all(input).unwrap();
126 assert_eq!(
127 String::from_utf8(msg).unwrap(),
128 "{\"data\": \"hello world\"}"
129 );
130 }
131
132 #[tokio::test]
133 async fn test_start_reading() {
134 let (tx, mut rx) = mpsc::channel(10);
135 let reader = CmdReader::new(tx);
136
137 let input = Cursor::new(b"{\"cmd\":\"a\"}\n{\"cmd\":\"b\"}\nnot-json\n{\"cmd\":\"c\"}\n");
138 let handle = reader.start_reading(input);
139
140 let msg1 = rx.recv().await.unwrap();
141 assert_eq!(String::from_utf8(msg1).unwrap(), "{\"cmd\":\"a\"}");
142
143 let msg2 = rx.recv().await.unwrap();
144 assert_eq!(String::from_utf8(msg2).unwrap(), "{\"cmd\":\"b\"}");
145
146 let msg3 = rx.recv().await.unwrap();
148 assert_eq!(String::from_utf8(msg3).unwrap(), "{\"cmd\":\"c\"}");
149
150 handle.join().unwrap().unwrap();
151 }
152}